package com.example.dobs.demo.flink.realtime.report.utils;

import com.example.dobs.demo.flink.realtime.report.bean.Log;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import static com.example.dobs.demo.flink.realtime.report.utils.MyUtils.eventMerge;

public class MyProcessFunction extends ProcessFunction<Tuple2<String, Log.Event>, Log.Event> {
    private volatile transient ValueState<Log.Event> requestState;
    private volatile transient ValueState<Log.Event> impressionState;
    private static int timeToLiveMinutes;

    @Override
    public void open(Configuration config) throws Exception {
//
//        Configuration parameters = (Configuration)
//                getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//        timeToLiveMinutes = parameters.getInteger("time-to-live-minutes", 30);
//        timeToLiveMinutes = 30;
        timeToLiveMinutes = config.getInteger("time-to-live-minutes", 15);
//        timeToLiveMinutes = Integer.parseInt(getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("time-to-live-minutes"));
        ValueStateDescriptor<Log.Event> requestStateDescriptor = new ValueStateDescriptor<>("request state descriptor", Log.Event.class);
        ValueStateDescriptor<Log.Event> impressionStateDescriptor = new ValueStateDescriptor<>("show click state descriptor", Log.Event.class);

        impressionState = getRuntimeContext().getState(impressionStateDescriptor);
        requestState = getRuntimeContext().getState(requestStateDescriptor);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Log.Event> out) throws Exception {
        //没有匹配到
        Log.Event request = requestState.value();
        if (request != null) {

            out.collect(request);
        }
        Log.Event impression = impressionState.value();
        if (impression != null) {
            out.collect(impression);
        }
        requestState.clear();
        impressionState.clear();
    }

    @Override
    public void processElement(Tuple2<String, Log.Event> value, Context ctx, Collector<Log.Event> out) throws Exception {

        String eventTag = value.f0;

        if (eventTag.equals("request")) {
            Log.Event request = value.f1;
            //处理重复日志
            Log.Event oldRequest = impressionState.value();
            if (oldRequest != null){
                request = eventMerge(request,oldRequest);
            }
            requestState.update(request);
            if (request.getBidWon()<1){//不可能有曝光的request
                out.collect(request);
                requestState.clear();
                return;
            }

            Log.Event impression = impressionState.value();//拿曝光
            if (impression == null){//（1）没有impression数据，继续等待
                ctx.timerService().registerEventTimeTimer(request.getTs() + timeToLiveMinutes * 60 * 1000);//定时器
            }else {
                Log.Event mergedEvent = eventMerge(request,impression);
                if (mergedEvent.getClick() > 0 && mergedEvent.getImp() > 0) {//（2）结束等待
                    out.collect(mergedEvent);
                    requestState.clear();
                    impressionState.clear();
                }else {//（3）更新requestState状态,继续等待
                    requestState.update(mergedEvent);
                    impressionState.clear();
                    ctx.timerService().registerEventTimeTimer(request.getTs() + timeToLiveMinutes * 60 * 1000);//定时器
                }
            }

        }

        if (eventTag.equals("impression")) {
            Log.Event impression = value.f1;
            //聚合日志
            Log.Event oldImpression = impressionState.value();
            if (oldImpression != null){
                impression = eventMerge(impression,oldImpression);
            }
            impressionState.update(impression);

            Log.Event request = requestState.value();
            if (request == null) {//(1)无request数据，继续等待
                ctx.timerService().registerEventTimeTimer(impression.getTs() + timeToLiveMinutes * 60 * 1000);
            }else{
                Log.Event mergedEvent = eventMerge(request, impression);
                if (mergedEvent.getClick()>0&&mergedEvent.getImp()>0){//（2）结束
                    out.collect(mergedEvent);
                    requestState.clear();
                    impressionState.clear();
                }else {// （3）更新requestState状态,继续等待
                    requestState.update(mergedEvent);
                    impressionState.clear();
                    ctx.timerService().registerEventTimeTimer(request.getTs() + timeToLiveMinutes * 60 * 1000);//定时器

                }
            }
        }

    }

}
